package com.niit.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.protocol.types.Field;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {

    public static void main(String[] args) {
        //1.创建连接Kafka的配置
        Properties props = new Properties();
        //1.1配置Kafka主机信息
        props.put("bootstrap.servers","node1:9092");
        //1.2设置消费者组，每一个消费者都需要指定一个消费者组。如果消费者的组名是相同的，表示这几个消费者都是同一组的
        props.put("group.id","T2");
        //自动提交offet
        props.put("enable.auto.commit","true");
        //自动提交offset的时间间隔
        props.put("auto.commit,interval.ms","1000");
        //props.put("auto.offset.reset","earliest");//从最旧的数据进行消费
        //拉去key/value数据进行反序列化。 发送数据时候将数据进行 序列化（将字符串变成字节数据）  接收数据的时候就要反序列化（字节数据变成字符串）
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        //2.创建Kafka消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //3.订阅要消费的主题
        consumer.subscribe(Arrays.asList("BD2_1"));

        //4.不断从kafka拉取数据
        while (true){
            //4.1消费者一次拉取一批的数据
            ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(5));
            //4.2拉取出来的数据进行打印
            for (ConsumerRecord<String,String> data:consumerRecords) {
                //获取主题
                String topic = data.topic();
                //获取偏移量
                long offset = data.offset();
                //获取key/value
                String key = data.key();
                String value = data.value();

                System.out.println("主题："+topic+",偏移量："+offset+",键："+key+",值："+value);

            }
        }

    }
}
